Cluster Linking

Cluster LinkingConfluent Platform提供的一种功能,用于将多个Kafka集群连接在一起。该功能允许不同的Kafka集群之间进行数据的镜像和复制。Cluster Linking将在数据目标(Destination)集群启动,并复制数据源(Source)集群的数据到目标集群。本文将向您介绍如何使用云消息队列 Confluent 版Cluster Linking。主要包括如何远程使用Confluent CLI客户端方式创建Cluster Linking以及Cluster Linking的基本管理。

前提条件

  • 已经准备好数据源集群和数据目标集群。

  • 已经准备好用于连接Source集群与Destination集群的机器。本文以ECS为例,实例创建和使用,请参见通过控制台使用ECS实例(快捷版)

    • 安装了Confluent Platform 7.0.0及更高版本客户端,更多信息,请参见Confluent

    • 安装Java 811。更多信息,请参见安装JDK

配置文件

ECS实例中新建配置文件,用于连接Source集群与Destination集群。请将下文注意示例代码中的<username><password><source-cluster-address:port>替换为您本地的配置

  1. 创建连接Source集群的配置文件/tmp/source.config,并按照以下配置打开自动创建Mirror Topic、同步Consumer Group、同步ACL用户开关。

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";
    bootstrap.servers=<source-cluster-address:port>
    auto.create.mirror.topics.enable=true
    consumer.offset.sync.enable=true
    acl.sync.enable=true
  2. 创建连接Destination集群的配置文件/tmp/destination.config

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";

测试数据准备

使用Confluent Platform CLI执行相关命令,在Source集群上准备测试数据。请将示例中的<source-cluster-address:port>替换为您本地的配置。

  1. 执行以下命令,在Source集群上创建一个具有单分区的待镜像的Topic,以便更容易观察复制消息的顺序。

    kafka-topics --create --topic test-topic --partitions 1 \
    --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config

    使用list topicdescribe topic命令查看Topic详情。

    #list topic
    kafka-topics --list --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config
    
    #describe topic
    kafka-topics --describe --topic test-topic \
    --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config
  2. 执行以下命令,向Source集群上的test-topic发送消息。

    seq 1 5 | kafka-console-producer --topic test-topic \
    --bootstrap-server <source-cluster-address:port> \
    --producer.config /tmp/source.config
  3. 消费Source集群中test-topic上的数据并指定Consumer Group。

    # consume
    kafka-console-consumer --topic test-topic ---beginning \
    --bootstrap-server <source-cluster-address:port> --group test-group \
    --consumer.config /tmp/source.config
    
    # list consumer groups
    kafka-consumer-groups --bootstrap-server <source-cluster-address:port> --list \
    --command-config /tmp/source.config
    
    # describe offsets of consumer groups
    kafka-consumer-groups --bootstrap-server <source-cluster-address:port> \
    --group test-group --describe --offsets \
    --command-config /tmp/source.config

    如果成功消费消息,您的输出将是:

    1

    2

    3

    4

    5

  4. 新增ACL用户并赋予可写权限。

    # add user and write permission
    kafka-acls --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config  --add --allow-principal User:test-user \
    --operation READ --topic test-topic
      
    # list
    kafka-acls --list --bootstrap-server <source-cluster-address:port> \
    --command-config /tmp/source.config

数据同步

本示例假设Source集群和Destination集群需要用SASL_SSL的方式登录集群,并且假设连接集群的时候,使用了证书进行域名校验,请注意替换以下示例代码中的<source-cluster-address:port><destination-cluster-address:port>

  1. 创建选择迁移Topic的配置文件/tmp/topic_filter.json

    { 
      "topicFilters": [ 
        {
          "name": "test-topic",  
          "patternType": "LITERAL",  
          "filterType": "INCLUDE"
        } 
      ]
    }
  2. 创建选择迁移消费者组的配置文件/tmp/group.json

    {
      "groupFilters": [
        {
          "name": "test-group",
          "patternType": "LITERAL",
          "filterType": "INCLUDE"
        }
      ]
    }
  3. 创建选择迁移ACL权限的配置文件/tmp/acl.json

    {
      "aclFilters": [
        {
          "resourceFilter": {
            "resourceType": "any",
            "patternType": "any"
          },
          "accessFilter": {
            "operation": "any",
            "permissionType": "any"
          }
        }
      ]
    }
  4. 创建Cluster Linking并复制Topics、Consumer Groups、ACL用户权限。

    kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config --create --link test-cluster-link \
    --config-file /tmp/source.config \
    --topic-filters-json-file /tmp/topic_filter.json \
    --consumer-group-filters-json-file /tmp/group.json \
    --acl-filters-json-file /tmp/acl.json
  5. 待数据同步完成后,修改Mirror Topic的状态为promote,使Mirror Topic可读写。此时,Mirror Topic不再从源Topic同步消息。

    kafka-mirrors --promote --topics test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

迁移测试

完成上述数据同步的步骤后,可以通过以下操作验证迁移是否成功。

  1. 查看Destination集群Topic、Consumer Group、ACL用户权限是否同步。

    # list topic
    kafka-topics --list --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
    
    # list consumer group
    kafka-consumer-groups --bootstrap-server <destination-cluster-address:port> \
    --list --command-config /tmp/destination.config
    
    # list acl
    kafka-acls --list --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
  2. 验证Topic生产、消费是否正常。

    # produce
    kafka-console-producer --topic test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --producer.config /tmp/destination.config
    
    # consume
    kafka-console-consumer --topic test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --consumer.config /tmp/destination.config

Cluster Linking管理

本节描述如何管理已创建的Cluster Linking。请您注意替换示例代码中的<destination-cluster-address:port>

  1. 执行以下命令,查看Cluster Linking列表。

    kafka-cluster-links --bootstrap-server <destination-cluster-address:port> \
     --list --command-config /tmp/destination.config 
  2. 执行以下命令,查看Cluster Linking详情。

    kafka-configs --describe --cluster-link test-cluster-link \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config
  3. 执行以下命令,将Mirror Topic转换为普通Topic。

    kafka-mirrors --promote --topics test-topic \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

    预期输出:

    Calculating max offset and ms lag for mirror topics: [test-topic]
    Finished calculating max offset lag and max lag ms for mirror topics: [test-topic]
    Request for stopping topic test-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
  4. 执行以下命令,删除Cluster Linking。

    kafka-cluster-links --delete --link test-cluster-link \
    --bootstrap-server <destination-cluster-address:port> \
    --command-config /tmp/destination.config

    预期输出:

    Cluster link 'test-cluster-link' deletion successfully completed.

相关文档